Skip to content

Initial implementation of Pipeline DLQ#5277

Merged
kkondaka merged 8 commits into
opensearch-project:mainfrom
kkondaka:dlq-pipeline
Aug 12, 2025
Merged

Initial implementation of Pipeline DLQ#5277
kkondaka merged 8 commits into
opensearch-project:mainfrom
kkondaka:dlq-pipeline

Conversation

@kkondaka
Copy link
Copy Markdown
Collaborator

Description

[Describe what this change achieves]

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • [X ] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

output(records, null);
}

void output(Collection<T> records, PipelineIf failurePipeline);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a setFailurePipeline method instead? I suspect the sinks are going to need to hang on this to in order to use it later in the code.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, rather than passing a PipelineIf, I'd probably pass this to the sinks.

interface FailurePipeline {
  void writeAll(Collection<T> records);
}

All the sinks care about is writing to the pipeline. I don't think they care about the Source object itself. Also, the Source interface doesn't support writing.

I'd even think that the implementation of FailurePipeline would write directly to the Buffer.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable Sinks that want to hang on can set that themselves, right? Why introduce a new API when the existing API can serve the purpose?


import org.opensearch.dataprepper.model.source.Source;

public interface PipelineIf {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does PipelineIf mean?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable I guess I meant it to be "Pipeline Interface" as a pointer to pipeline from Source. I am open to suggestions on the naming

Copy link
Copy Markdown
Collaborator Author

@kkondaka kkondaka Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed an interface in data-prepper-api directory so that I can use it in other interfaces in the directory

public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer {
static final Duration DEFAULT_SHUTDOWN_DURATION = Duration.ofSeconds(30L);

static final String DEFAULT_FAILURE_PIPELINE_NAME = "dlq";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should give this a different name.

default_failure_pipeline

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable Raj prefers the well known name like dlq. My idea was to provide a way to change this name in a future PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name dlq already implies something since our DLQs are different from any other concept. So I think a failure pipeline is clearer. Maybe default_dlq_pipeline instead?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or maybe just dlq_pipeline?

}

@Override
public void sendFailedEvents(Collection<Record<Event>> records) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good.

* @return FailurePipeline returns failure pipeline
* @since 2.12
*/
default FailurePipeline getFailurePipeline() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need these in the interface. Each Buffer can handle setFailurePipeline as needed.

* @return FailurePipeline returns failure pipeline
* @since 2.12
*/
default FailurePipeline getFailurePipeline() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need the getter on the interface.

* @return FailurePipeline returns failure pipeline
* @since 2.12
*/
default FailurePipeline getFailurePipeline() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need this on the interface.

* @return FailurePipeline returns failure pipeline
* @since 2.12
*/
default FailurePipeline getFailurePipeline() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need this on the interface.

public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer {
static final Duration DEFAULT_SHUTDOWN_DURATION = Duration.ofSeconds(30L);

static final String DEFAULT_FAILURE_PIPELINE_NAME = "dlq";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name dlq already implies something since our DLQs are different from any other concept. So I think a failure pipeline is clearer. Maybe default_dlq_pipeline instead?

try {
buffer.writeAll(records, DEFAULT_WRITE_TIMEOUT);
} catch (Exception e) {
LOG.error("Failed to write to failure pipeline");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we hit this if failure pipeline buffer ever gets full?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I guess I could add some retries here. But overall, we can't wait here forever.

@@ -107,7 +107,9 @@ Collection runProcessorsAndProcessAcknowledgements(List<Processor> processors, C
}
} catch (final Exception e) {
LOG.error("A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: ", e);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we change this to log that it's going to failure pipeline if it's enabled and only log the dropped message when failure pipeline doesn't exist?

@kkondaka
Copy link
Copy Markdown
Collaborator Author

kkondaka commented Aug 4, 2025

I am also thinking that I should rename "FailurePipeline" to "NoSourcePipeline". We may be using this pipeline in other cases as well. @dlvenable what do you think?

Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
@JsonProperty("sink") final List<SinkModel> sinks,
@JsonProperty("workers") final Integer workers,
@JsonProperty("delay") final Integer delay) {
checkArgument(Objects.nonNull(source), "Source must not be null");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is source not required anymore? Doesn't even DLQ pipeline have a pipeline source?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PIpelineModel, it does not.

@kkondaka , Do we have validations elsewhere on this?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@graytaylor0 DLQ pipelines do not have source because source/processor/buffer/sink can end events to DLQ pipeline. So, the source is the new "HeadlessPipelineSource" that I added. This source is created automatically for a DLQ pipeline. It is not configurable.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable. Yes, there are validations that fail if a source is not specified.

for (Map.Entry<String, Pipeline> pipelineEntry : pipelineMap.entrySet()) {
if (!(pipelineEntry.getKey().equals(failurePipelineName))) {
pipelineEntry.getValue().setFailurePipeline(failurePipeline);
acknowledgementsEnabled = acknowledgementsEnabled || pipelineEntry.getValue().areAcknowledgementsEnabled();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just do

 acknowledgementsEnabled = pipelineEntry.getValue().areAcknowledgementsEnabled();

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are 10 sub pipelines, and first 9 has ack enabled and the 10th doesn't then the final result would be false! (In fact, this shouldn't happen but just want to be sure) and we want to "release events" when acks are enabled.

numberOfEventsSuccessful.increment(records.size());
break;
} catch (Exception e) {
LOG.error(NOISY, "Failed to write to failure pipeline");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should log the exception message here.

LOG.error("A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: ", e);
if (inputEvents != null) {
if (pipeline.getFailurePipeline() != null) {
pipeline.getFailurePipeline().sendEvents(records);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may still want a log here

LOG.error("A processor threw an exception. This batch of Events will be sent to the pipeline DLQ, and their EventHandles will be released: ", e);

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@graytaylor0 LOG.error() automatically logs the exception, right?

}

@Test
//@Timeout(value = 2000, unit = TimeUnit.MILLISECONDS)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this comment.

Collection<Record<Event>> records = mock(Collection.class);
failurePipeline.sendEvents(records);
verify(headlessPipelineSource).sendEvents(records);
//assertThat(testPipeline.areAcknowledgementsEnabled(), equalTo(false));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove.

processorSets.forEach(processorSet -> processorSet.forEach(processor -> {
assertThat(((TestProcessor)processor).getFailurePipeline(), equalTo(failurePipeline));
}));
for (Sink sink: sinks) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert the size of this collection so that we know this loop runs.

@dlvenable
Copy link
Copy Markdown
Member

I am also thinking that I should rename "FailurePipeline" to "NoSourcePipeline". We may be using this pipeline in other cases as well. @dlvenable what do you think?

I like HeadlessPipeline.

@dlvenable
Copy link
Copy Markdown
Member

@kkondaka , The builds are all failing. Please take a look.

Signed-off-by: Kondaka <krishkdk@amazon.com>
Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! This will be a significant improvement to how Data Prepper handles errors!

@kkondaka kkondaka merged commit 81d5dff into opensearch-project:main Aug 12, 2025
46 of 47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants